package com.flink.examples.mysql;

import com.flink.examples.TUser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * @Description 读取MySQL数据源，并将查询结果发送到Flink的流中处理
 */
public class JdbcReader extends RichSourceFunction<TUser> {
    private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class);

    private Connection connection = null;
    private PreparedStatement preparedStatement = null;


    /**
     * job开始执行，调用此方法创建jdbc数据源连接对象,该方法主要用于打开数据库连接，下面的ConfigKeys类是获取配置的类
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 加载JDBC驱动
        Class.forName(MysqlConfig.DRIVER_CLASS);
        // 获取数据库连接
        connection = DriverManager.getConnection(MysqlConfig.SOURCE_DRIVER_URL,MysqlConfig.SOURCE_USER,MysqlConfig.SOURCE_PASSWORD);
        preparedStatement = connection.prepareStatement("SELECT id,name,age,sex,address,createTimeSeries FROM t_user");
    }

    /**
     * 执行SQL查询并获取结果,将结果通过SourceContext发送到DataStream流管道中进行并行处理
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<TUser> ctx) throws Exception {
        try {
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                int id = resultSet.getInt("id");
                String name = resultSet.getString("name");
                int age = resultSet.getInt("age");
                int sex = resultSet.getInt("sex");
                String address = resultSet.getString("address");
                long createTimeSeries = resultSet.getLong("createTimeSeries");

                TUser user = new TUser();
                user.setId(id);
                user.setName(name);
                user.setAge(age);
                user.setSex(sex);
                user.setAddress(address);
                user.setCreateTimeSeries(createTimeSeries);

                logger.info("readJDBC id:{}", id);
                //发送结果，结果是tuple1类型，1表示1个元素，可根据实际情况选择
                ctx.collect(user);
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
    }

    /**
     * job执行完毕后，调用此方法关闭jdbc连接源
     */
    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
    }
}
